1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.net.client.clientmanger;
12 
13 import std.socket;
14 
15 import kiss.exception;
16 import kiss.event;
17 import kiss.util.timer;
18 import kiss.net.TcpStream;
19 import collie.net.client.linklogInfo;
20 import collie.net.client.exception;
21 
22 import kiss.event.timer.common;
23 import kiss.event.task;
24 import kiss.util.functional;
25 
26 final class TCPClientManger
27 {
28 	alias ClientCreatorCallBack = void delegate(TcpStream);
29 	alias ConCallBack = void delegate(ClientConnection);
30 	alias LinklogInfo = TLinklogInfo!ConCallBack;
31 	alias NewConnection = ClientConnection delegate(TcpStream);
32 
33 	this(EventLoop loop)
34 	{
35 		_loop = loop;
36 	}
37 
38 	void setClientCreatorCallBack(ClientCreatorCallBack cback)
39 	{
40 		_oncreator = cback;
41 	}
42 
43 	void setNewConnectionCallBack(NewConnection cback)
44 	{
45 		_cback = cback;
46 	}
47 
48 	@property eventLoop(){return _loop;}
49 	@property timeout(){return _timeout;}
50 	@property tryCout(){return _tryCout;}
51 	@property tryCout(uint count){_tryCout = count;}
52 
53 	void startTimeout(uint s)
54 	{
55 		if(_wheel !is null)
56 			throw new SocketClientException("TimeOut is runing!");
57 		_timeout = s;
58 		if(_timeout == 0 || _timer)
59 			return;
60 		
61 		uint whileSize;uint time; 
62 		enum int[] fvka = [40,120,600,1000,uint.max];
63 		enum int[] fvkb = [50,60,100,150,300];
64 		foreach(i ; 0..fvka.length ){
65 			if(s <= fvka[i]){
66 				whileSize = fvkb[i];
67 				time = _timeout * 1000 / whileSize;
68 				break;
69 			}
70 		}
71 		
72 		_wheel = new TimingWheel(whileSize);
73 		_timer = new KissTimer(_loop, time);
74 		_timer.onTick(&onTimer);
75 		if(_loop.isInLoopThread()){
76 			_timer.start();
77 		} else {
78 			_loop.postTask(newTask(&_timer.start, false, false));
79 		}
80 	}
81 
82 	void connect(Address addr,ConCallBack cback = null)
83 	{
84 		if(_cback is null)
85 			throw new SocketClientException("must set NewConnection callback ");
86 		LinklogInfo * logInfo = new LinklogInfo();
87 		logInfo.addr = addr;
88 		logInfo.tryCount = 0;
89 		logInfo.cback = cback;
90 		if(_loop.isInLoopThread()){
91 			_postConmnect(logInfo);
92 		} else {
93 			_loop.postTask(newTask(&_postConmnect,logInfo));
94 		}
95 	}
96 
97 	void stopTimer(){
98 		if(_timer) {
99 			_timer.stop();
100 			_timer = null;
101 		}
102 	}
103 
104 protected:
105 	void connect(LinklogInfo * logInfo)
106 	{
107 		logInfo.client = new TcpStream(_loop);
108 		if(_oncreator)
109 			_oncreator(logInfo.client);
110 		logInfo.client.onClosed(&tmpCloseCallBack);
111 		logInfo.client.onConnected(bind(&connectCallBack,logInfo));
112 		// logInfo.client.setReadHandle(&tmpReadCallBack);
113 		logInfo.client.connect(logInfo.addr);
114 	}
115 
116 	void tmpReadCallBack(in ubyte[]) nothrow {}
117 	void tmpCloseCallBack() {}
118 
119 	void connectCallBack(LinklogInfo * logInfo,bool state) 
120 	{
121 		catchAndLogException((){
122 			import std.exception;
123 			if(logInfo is null)return;
124 			if(state) {
125 				scope(exit){
126 					_waitConnect.rmlogInfo(logInfo);
127 				}
128 				ClientConnection con;
129 				collectException(_cback(logInfo.client),con);
130 				if(logInfo.cback)
131 					logInfo.cback(con);
132 				if(con is null) return;
133 				if(_wheel)
134 					_wheel.addNewTimer(con);
135 				con.onActive();
136 			} else {
137 				logInfo.client = null;
138 				if(logInfo.tryCount < _tryCout) {
139 					logInfo.tryCount ++;
140 					connect(logInfo);
141 				} else {
142 					auto cback = logInfo.cback;
143 					_waitConnect.rmlogInfo(logInfo);
144 					if(cback)
145 						cback(null);
146 				}
147 			}
148 		}());
149 	}
150 
151 	void onTimer(Object ){
152 		_wheel.prevWheel();
153 	}
154 
155 private:
156 	final void _postConmnect(LinklogInfo * logInfo){
157 		_waitConnect.addlogInfo(logInfo);
158 		connect(logInfo);
159 	}
160 private:
161 	uint _tryCout = 1;
162 	uint _timeout;
163 
164 	EventLoop _loop;
165 	KissTimer _timer;
166 	TimingWheel _wheel;
167 	TLinkManger!ConCallBack _waitConnect;
168 
169 	NewConnection _cback;
170 	ClientCreatorCallBack _oncreator;
171 }
172 
173 @trusted abstract class ClientConnection : WheelTimer
174 {
175 	this(TcpStream client)
176 	{
177 		resetClient(client);
178 	}
179 
180 	final bool isAlive() @trusted {
181 		return _client && _client.isRegistered;
182 	}
183 
184 	final @property tcpClient()@safe {return _client;}
185 
186 	alias restClient = resetClient;
187 
188 	final void resetClient(TcpStream client) @trusted
189 	{
190 		if(_client !is null){
191 			_client.onClosed(null);
192 			_client.onDataReceived(null);
193 			_client.onConnected(null);
194 			_client = null;
195 		}
196 		if(client !is null){
197 			_client = client;
198 			_loop = cast(EventLoop) client.eventLoop;
199 			_client.onClosed(&doClose);
200 			_client.onDataReceived(&onRead);
201 			_client.onConnected(&tmpConnectCallBack);
202 		}
203 	}
204 
205 	final void write(in ubyte[] data, DataWrittenHandler cback = null) @trusted
206 	{
207 		write(new SocketStreamBuffer(data,cback));
208 	}
209 
210 	final void write(StreamWriteBuffer buffer) @trusted
211     {
212         if (_loop.isInLoopThread()) {
213             _postWriteBuffer(buffer);
214         } else {
215             _loop.postTask(newTask(&_postWriteBuffer, buffer));
216         }
217     }
218 
219 	final void restTimeout() @trusted
220 	{
221 		if(_loop.isInLoopThread()){
222 			rest();
223 		} else {
224 			_loop.postTask(newTask(&rest,0));
225 		}
226 	}
227 
228 	pragma(inline)
229 	final void close() @trusted
230 	{
231 		_loop.postTask(newTask(&_postClose));
232 	}
233 protected:
234 	void onActive() nothrow;
235 	void onClose() nothrow;
236 	void onRead(in ubyte[] data) nothrow;
237 private:
238 	final void tmpConnectCallBack(bool) nothrow{}
239 	final void doClose() @trusted nothrow
240 	{
241 		catchAndLogException((){
242 			stop();
243 			onClose();
244 		}());
245 	}
246 
247 	final void _postClose(){
248 		if(_client)
249 			_client.close();
250 	}
251 
252     final void _postWriteBuffer(StreamWriteBuffer buffer)
253     {
254         if (_client) {
255             rest();
256             _client.write(buffer);
257         } else
258             buffer.doFinish();
259     }
260 
261 private:
262 	TcpStream _client;
263 	EventLoop _loop;
264 }